package com.gcloud.mesh.dcs.dataclean;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

import javax.annotation.PostConstruct;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.gcloud.mesh.dcs.dataclean.consumer.ConsumerManager;
import com.gcloud.mesh.dcs.dataclean.consumer.DeleteConsumer;
import com.gcloud.mesh.dcs.dataclean.consumer.IConsumer;
import com.gcloud.mesh.dcs.dataclean.consumer.ReplaceConsumer;

import lombok.extern.slf4j.Slf4j;

@Slf4j
@Component
public class DataCleanManager {
	
	private Queue<Object> inputQueues = new ConcurrentLinkedQueue<Object>();
	
	private Queue<Object> outputQueues = new ConcurrentLinkedQueue<Object>();
	
	private static final String OPERATE_REPLACE = "replace";
	
	private static final String OPERATE_DELETE = "delete";
	
	private volatile boolean started = false;
	
	private Map<String, ConsumerManager> consumerMap = new ConcurrentHashMap<String, ConsumerManager>();
	
	@Autowired
	private ConsumerManager consumerManager;
	
	public void init(String operateType) {
		started = true;
		if(this.OPERATE_DELETE.equals(operateType)) {
			DeleteConsumer deleteConsumer = new DeleteConsumer();
			consumerManager.addChain(deleteConsumer);
		}
		if(this.OPERATE_REPLACE.equals(operateType)) {
			ReplaceConsumer replaceConsumer = new ReplaceConsumer();
			consumerManager.addChain(replaceConsumer);
		}
	}
	
	public Object syncProcess(Object obj) {
		Object outputObj = obj;
		if(isStarted()) {
			
			if(obj != null) {
				try {
					outputObj = consumerManager.process(obj);
				} catch(Exception e) {
					log.info("[DataCleanManager] consumerManager 调用process失败:{}", e.getMessage());
				}
			}
			
		}else {
			log.info("[DataCleanManager] dataCleanManager 未调用init()方法初始化");
		}
		return outputObj;
	}
	
	private boolean isStarted() {
		return started;
	}
	
	public synchronized void reset() {
		consumerManager.resetChain();
	}
	
	public void input(Object msg) {
		// 谨慎使用，待优化
		inputQueues.add(msg);
	}
	
	public void start() {
		System.out.println("here!!!");
		new Thread(new Runnable() {

			@Override
			public void run() {
				// TODO Auto-generated method stub
				doExecute();
			}
			
		}).start();
	}
	
	private void doExecute() {
		// 谨慎使用，待优化
		while(true) {
			if(inputQueues != null && !inputQueues.isEmpty()) {
				try {
					Object obj = inputQueues.poll();
					if(obj != null) {
						//用线程池
						Object handledObj = consumerManager.process(obj);
				        if(handledObj != null) {
				        	outputQueues.add(handledObj);
				        }
					}
				} catch(Exception e) {
					
				}
			}
		}
	}
	
	public Object output() {
		// 谨慎使用，待优化
		return outputQueues.poll();
	}
}
